Skip to content

Conversation

zprobst
Copy link
Member

@zprobst zprobst commented Aug 26, 2025

This PR focuses on refactoring the pipeline code in order to better accommodate and implement a record lineage tracking system. The goal of which is to enable steps, often extractors, to free up resources associated with a yielded record to acknowledge back to systems that a record was truly and completely processed.

Refactor

The current pipeline code is tenuous and hard to introduce change into because of the fact that it is implemented as a series of procedures and not well encapsulated. In these situations, we need to make some cross cutting changes to refactor the pipeline code.

The refactor itself works on the observation that the previous code was implemented as a procedure so that there could be careful transitions from one state that a step is in to another. In order to remove the procedures, the pipeline was refactored to operate a state machine for each step in the pipeline as well as the output.

Steps Transition through a state flow that looks like this:

StartStepStateProcessRecordsStateEmitOutstandingRecordsStateStopStepExecution

and the pipeline output progresses via a state flow like this:

PipelineOutputStartStatePipelineOutputProcessRecordsStatePipelineOutputStopState

This means that both steps and the pipeline output use the same executor pattern, simplifying the overall architecture.

Lineage Tracking

The highlight of this PR i record lineage tracking. This, in short, builds a tree of intermediary and final records produced from every single output record. In other words, we track parent-child relationships of every record in and out of every step. With this information, we are able to know when nodestream is 'done' processing a record and trigger a callback to the originating step when its appropriate. All steps can use this following the same pattern. As an example an extractor is provided below:

from nodestream.pipeline import Extractor


class TracksLineageWithTheOriginalRecord(Extractor):
    async def finalize_record(self, record):
      print("The following record was returned to me", record)

    async def extract_records(self):
      for i in range(10000):
          yield i 


class TracksLineageWithSomeToken(Extractor):
   def __init__(self, items):
      self.items = items

   async def finalize_record(self, token):
      print("The following record was returend to me", self.items[token])

   async def extract_record(self):
      for index, item in enumerate(self.items):
         yield item, index # yielding a tuple here, the second item here is what is sent back to `finalize_record`

Bug Fixes

As miscellanies, this PR fixes to minor issues with exception handling in the pipeline logic as well.

  • Fixes a race condition where If an exception is thrown before the on_start callback is triggered, the CLI may not have started the progress spinner that reports error messages. This leads to a separate error on the crash that obscures the original error. The fix is to ensure that the on_start is explicitly executed first before actual pipeline processing begins.
  • Fixes a error where it is impossible for the CLI to throw an exception in order to emit a status code other than zero. This is because the hook provided to the CLI to do this (on_finish) had a try catch block around it. That try catch was not required because, by the time the pipeline executes this code, there is nothing else to do so there is no reason to swallow the exception in order to protect the integrity of the pipeline. the fix was to simply remove that try catch.

Due to this behavior, we'll likely need to make a 0.15 release.

@zprobst zprobst requested a review from ccloes as a code owner August 26, 2025 21:40
@zprobst zprobst marked this pull request as draft August 26, 2025 21:42
Copy link

codecov bot commented Aug 28, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.30%. Comparing base (f0d69f4) to head (f3a3aa5).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #435      +/-   ##
==========================================
+ Coverage   98.26%   98.30%   +0.03%     
==========================================
  Files         152      152              
  Lines        6171     6247      +76     
==========================================
+ Hits         6064     6141      +77     
+ Misses        107      106       -1     
Flag Coverage Δ
3.10-macos-latest 98.27% <100.00%> (+0.03%) ⬆️
3.10-ubuntu-latest 98.27% <100.00%> (+0.03%) ⬆️
3.10-windows-latest 98.27% <100.00%> (+0.03%) ⬆️
3.11-macos-latest 98.28% <100.00%> (+0.03%) ⬆️
3.11-ubuntu-latest 98.27% <100.00%> (+0.03%) ⬆️
3.11-windows-latest 98.27% <100.00%> (+0.03%) ⬆️
3.12-macos-latest 98.28% <100.00%> (+0.03%) ⬆️
3.12-ubuntu-latest 98.27% <100.00%> (+0.03%) ⬆️
3.12-windows-latest 98.27% <100.00%> (+0.03%) ⬆️
3.13-macos-latest 98.28% <100.00%> (+0.03%) ⬆️
3.13-ubuntu-latest 98.27% <100.00%> (+0.03%) ⬆️
3.13-windows-latest 98.27% <100.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@zprobst zprobst marked this pull request as ready for review September 26, 2025 12:31
@zprobst zprobst requested review from cbadke and jbristow September 26, 2025 12:31
Copy link

@cbadke cbadke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really excited to see how all this is coming together. Just a few notes and questions.

Comment on lines 109 to 110
tracks_lineage: bool = False

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any developer benefit to encapsulating this as a new subclass of Step? Then finalize_record could only be declared on that interface?

I feel like this could avoid confusion for Step developers that don't need this finalizing behaviour since if that boolean is false then finalize_record is never called.

def FinalizingStep(Step):
    async def finalize_record(self, callback_token: object):
         """Finalize a record.
        This method is called when a record produced by this step has been
        fully processed by all downstream steps. It is not called for records
        that are not produced by this step.
        """
        pass

Copy link
Member Author

@zprobst zprobst Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea... thought about it a bit. Right now we have class hierarchies that look like this:

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
Loading

Lets assume that we want to add finalization to our MyAwesomeTransformer by inheriting from FinalizingStep. We'd need to have a class hierarchy like this:

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
    D[FinalizingStep] --> C
    A --> D
Loading

This creates a... confusing class hierarchy and can lead to weird to weird MRO issues.

Then imagine we have a ApronSpringsStep that gets notified every time we have operate on a give record.

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
    D[FinalizingStep] --> C
    A --> D
    E[ApronSpringsStep] --> C
    A --> E
Loading

This violates my personal rule for relatively shallow, flat hierarchies of classes. The more cases we add to this example the more it feels that its really the same case with the implementer of Step choosing to do something or not depending on the cases.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's a really good point. As I look at this, it feels like Finalizing is more of a Protocol than a SubClass. Would that feel any better?

It's a bit of an abuse because utilizing the protocol changes the frameworks treatment of Step outputs so maybe it's still not a great idea. I think I'm just trying to address the bad feeling of a boolean behavior flag and a method that is unimportant to most use cases.

I leave it to your judgement on how you want that interface and experience to work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've come down into there isn't really a need to distinguish a protocol or subclass to avoid the bools. Not having it in this case is the same as doing nothing. For Step is is always a reasonable default implementation that we can rely on. This case its just pass.

Comment on lines 19 to 22
@dataclass(slots=True)
class Record:
"""A `Record` is a unit of data that is processed by a pipeline."""

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a framework facing class so maybe not critical but I wonder if naming this Record will cause some confusion. As I'm reading through the code, I'm realizing that throughout the system record is used to refer to the input and output of Steps. But now we have a new Record class where that object that is often called a record is the data property of this class. Could get confusing for folks not deeply entrenched in the framework?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point... I'll workshop a better name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone with RecordContext

Comment on lines 54 to 56
data = callback_token = emission
if isinstance(emission, tuple) and step.tracks_lineage:
data, callback_token = emission
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be beneficial for callback_token == None when steps.tracks_lineage == None? Would make it explicitly clear than no token was actually communicated out from the step.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My pendulum has swung the other way on this. I think always calling it and calling and not having a flag is the most predictable pattern.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to update docs and advertise the breaking change that any tuple returned from a step will have the last element stripped off.

self.step_outbox_size, current_output_name, current_input_name
)
pipeline_output = PipelineOutput(current_input, reporter)
executors.append(Executor.pipeline_output(current_input, reporter))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misremembering but I thought we needed the reporter to be first in the list in order to fix the race?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is? Line 484 we create a blank list and 497 is the first we append to it so it will have position 0. Do you think its better if we create it there to be a touch more clear about that. Something like:

executors = [Executor.pipeline_output(current_input, reporter)]

Does that seem more appropriate?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I was completely misinterpreting this section. I think I'm not fulling grokking all of the Executor abstractions and channel management and that was clouding my read of it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good... this PR is doing a lot in it.

Comment on lines 39 to 49
async def test_record_from_step_emission_tuple_data():
"""Test Record.from_step_emission with tuple (data, token)."""
step = Mock(spec=Step)
data = {"test": "data"}
token = "callback_token"

record = Record.from_step_emission(step, (data, token))

assert record.data == data
assert record.callback_token == token
assert record.originating_step == step
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this test pass if tracks_lineage isn't set on the Step? This seems like an issue with a number of tests in this file?

Maybe Mock defaults bools to True? If so, I feel like it would be clearer to be explicit about that value.

# Should not raise any exceptions and should do nothing


@pytest.mark.asyncio
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, I think this test just tests that You can call a function. The mock_finalize is defined, attached to the step object and then called. I'm not sure this is testing anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably teh tests are pretty bad. I'll go through them



@pytest.mark.asyncio
async def test_finalize_record_async_behavior():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually testing that Record.drop() calls the finalize with await?

@zprobst zprobst changed the base branch from main to 0.15 October 3, 2025 13:32
@zprobst
Copy link
Member Author

zprobst commented Oct 3, 2025

I think due to the complexity and nuance in this change, we are due for merging this into a new breaking release. I see this as a good opportunity to make some other small changes we need to make a-la #407

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants